{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "c4419168-c0e6-4a65-b56e-8454c42060ac",
   "metadata": {
    "jp-MarkdownHeadingCollapsed": true,
    "tags": []
   },
   "source": [
    "### 0. Spark Setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "32bd7cdd-8504-4a54-a461-244bf7878d2a",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "3aab2a7e-a685-4925-9c9a-b5adf201af77",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      ":: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Ivy Default Cache set to: /root/.ivy2/cache\n",
      "The jars for the packages stored in: /root/.ivy2/jars\n",
      "org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency\n",
      "org.apache.spark#spark-avro_2.12 added as a dependency\n",
      ":: resolving dependencies :: org.apache.spark#spark-submit-parent-5a3a4db6-be91-4d32-9884-8b0f38241b3f;1.0\n",
      "\tconfs: [default]\n",
      "\tfound org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central\n",
      "\tfound org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central\n",
      "\tfound org.apache.kafka#kafka-clients;2.8.1 in central\n",
      "\tfound org.lz4#lz4-java;1.8.0 in central\n",
      "\tfound org.xerial.snappy#snappy-java;1.1.8.4 in central\n",
      "\tfound org.slf4j#slf4j-api;1.7.32 in central\n",
      "\tfound org.apache.hadoop#hadoop-client-runtime;3.3.2 in central\n",
      "\tfound org.spark-project.spark#unused;1.0.0 in central\n",
      "\tfound org.apache.hadoop#hadoop-client-api;3.3.2 in central\n",
      "\tfound commons-logging#commons-logging;1.1.3 in central\n",
      "\tfound com.google.code.findbugs#jsr305;3.0.0 in central\n",
      "\tfound org.apache.commons#commons-pool2;2.11.1 in central\n",
      "\tfound org.apache.spark#spark-avro_2.12;3.3.1 in central\n",
      "\tfound org.tukaani#xz;1.8 in central\n",
      ":: resolution report :: resolve 544ms :: artifacts dl 11ms\n",
      "\t:: modules in use:\n",
      "\tcom.google.code.findbugs#jsr305;3.0.0 from central in [default]\n",
      "\tcommons-logging#commons-logging;1.1.3 from central in [default]\n",
      "\torg.apache.commons#commons-pool2;2.11.1 from central in [default]\n",
      "\torg.apache.hadoop#hadoop-client-api;3.3.2 from central in [default]\n",
      "\torg.apache.hadoop#hadoop-client-runtime;3.3.2 from central in [default]\n",
      "\torg.apache.kafka#kafka-clients;2.8.1 from central in [default]\n",
      "\torg.apache.spark#spark-avro_2.12;3.3.1 from central in [default]\n",
      "\torg.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 from central in [default]\n",
      "\torg.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 from central in [default]\n",
      "\torg.lz4#lz4-java;1.8.0 from central in [default]\n",
      "\torg.slf4j#slf4j-api;1.7.32 from central in [default]\n",
      "\torg.spark-project.spark#unused;1.0.0 from central in [default]\n",
      "\torg.tukaani#xz;1.8 from central in [default]\n",
      "\torg.xerial.snappy#snappy-java;1.1.8.4 from central in [default]\n",
      "\t---------------------------------------------------------------------\n",
      "\t|                  |            modules            ||   artifacts   |\n",
      "\t|       conf       | number| search|dwnlded|evicted|| number|dwnlded|\n",
      "\t---------------------------------------------------------------------\n",
      "\t|      default     |   14  |   0   |   0   |   0   ||   14  |   0   |\n",
      "\t---------------------------------------------------------------------\n",
      ":: retrieving :: org.apache.spark#spark-submit-parent-5a3a4db6-be91-4d32-9884-8b0f38241b3f\n",
      "\tconfs: [default]\n",
      "\t0 artifacts copied, 14 already retrieved (0kB/8ms)\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23/02/21 21:20:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Setting default log level to \"WARN\".\n",
      "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "import pyspark.sql.types as T\n",
    "import pyspark.sql.functions as F\n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"Spark-Notebook\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6f4b62fa-b3ce-4a1b-a1f4-2ed332a0d55a",
   "metadata": {
    "jp-MarkdownHeadingCollapsed": true,
    "tags": []
   },
   "source": [
    "### 1. Reading from Kafka Stream\n",
    "\n",
    "through `readStream`"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f491fa45-4471-4bc5-92f7-48081f687140",
   "metadata": {},
   "source": [
    "#### 1.1 Raw Kafka Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "82c25cb2-2599-4f9b-8849-967fbb604a44",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "# default for startingOffsets is \"latest\"\n",
    "df_kafka_raw = spark \\\n",
    "    .readStream \\\n",
    "    .format(\"kafka\") \\\n",
    "    .option(\"kafka.bootstrap.servers\", \"localhost:9092,broker:29092\") \\\n",
    "    .option(\"subscribe\", \"rides_csv\") \\\n",
    "    .option(\"startingOffsets\", \"earliest\") \\\n",
    "    .option(\"checkpointLocation\", \"checkpoint\") \\\n",
    "    .load()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "d9149ccd-69b2-4f5b-afc0-43567673c634",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- key: binary (nullable = true)\n",
      " |-- value: binary (nullable = true)\n",
      " |-- topic: string (nullable = true)\n",
      " |-- partition: integer (nullable = true)\n",
      " |-- offset: long (nullable = true)\n",
      " |-- timestamp: timestamp (nullable = true)\n",
      " |-- timestampType: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_kafka_raw.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "62e5e753-89c7-460f-a8be-16868ce5c680",
   "metadata": {
    "jp-MarkdownHeadingCollapsed": true,
    "tags": []
   },
   "source": [
    "#### 1.2 Encoded Kafka Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "0b745eed-7d74-421e-8e4b-c8343fda4de3",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_kafka_encoded = df_kafka_raw.selectExpr(\"CAST(key AS STRING)\",\"CAST(value AS STRING)\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "6839addc-c7c0-4117-8c9c-d2cd59cbf136",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- key: string (nullable = true)\n",
      " |-- value: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_kafka_encoded.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6749c4de-6f80-4b91-b2b8-b2968c761d75",
   "metadata": {},
   "source": [
    "#### 1.3 Structure Streaming DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "ca20ae37-49f0-421f-9859-73fac8d4ca45",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "def parse_ride_from_kafka_message(df_raw, schema):\n",
    "    \"\"\" take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema \"\"\"\n",
    "    assert df_raw.isStreaming is True, \"DataFrame doesn't receive streaming data\"\n",
    "\n",
    "    df = df_raw.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n",
    "\n",
    "    # split attributes to nested array in one Column\n",
    "    col = F.split(df['value'], ', ')\n",
    "\n",
    "    # expand col to multiple top-level columns\n",
    "    for idx, field in enumerate(schema):\n",
    "        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))\n",
    "    return df.select([field.name for field in schema])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "e1737bd0-146f-4ee2-a70f-a4657af5bbc6",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "ride_schema = T.StructType(\n",
    "    [T.StructField(\"vendor_id\", T.IntegerType()),\n",
    "     T.StructField('tpep_pickup_datetime', T.TimestampType()),\n",
    "     T.StructField('tpep_dropoff_datetime', T.TimestampType()),\n",
    "     T.StructField(\"passenger_count\", T.IntegerType()),\n",
    "     T.StructField(\"trip_distance\", T.FloatType()),\n",
    "     T.StructField(\"payment_type\", T.IntegerType()),\n",
    "     T.StructField(\"total_amount\", T.FloatType()),\n",
    "     ])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "ae2ce896-f54b-4166-b01f-b5532ab292fe",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "cd848228-97c5-4325-8457-97f35e533cd8",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- vendor_id: integer (nullable = true)\n",
      " |-- tpep_pickup_datetime: timestamp (nullable = true)\n",
      " |-- tpep_dropoff_datetime: timestamp (nullable = true)\n",
      " |-- passenger_count: integer (nullable = true)\n",
      " |-- trip_distance: float (nullable = true)\n",
      " |-- payment_type: integer (nullable = true)\n",
      " |-- total_amount: float (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_rides.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "60277fdc-2797-4b23-9ecf-956b76db5778",
   "metadata": {
    "tags": []
   },
   "source": [
    "### 2 Sink Operation & Streaming Query\n",
    "\n",
    "through `writeStream`\n",
    "\n",
    "---\n",
    "**Output Sinks**\n",
    "- File Sink: stores the output to the directory\n",
    "- Kafka Sink: stores the output to one or more topics in Kafka\n",
    "- Foreach Sink:\n",
    "- (for debugging) Console Sink, Memory Sink\n",
    "\n",
    "Further details can be found in [Output Sinks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)\n",
    "\n",
    "---\n",
    "There are three types of **Output Modes**:\n",
    "- Complete: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.\n",
    "- Append (default): Only new rows are added to the Result Table\n",
    "- Update: Only updated rows are outputted\n",
    "\n",
    "[Output Modes](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) differs based on the set of transformations applied to the streaming data. \n",
    "\n",
    "--- \n",
    "**Triggers**\n",
    "\n",
    "The [trigger settings](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) of a streaming query define the timing of streaming data processing. Spark streaming support micro-batch streamings schema and you can select following options based on requirements.\n",
    "\n",
    "- default-micro-batch-mode\n",
    "- fixed-interval-micro-batch-mode\n",
    "- one-time-micro-batch-mode\n",
    "- available-now-micro-batch-mode\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "02ca9b08-aa61-46cd-b946-4457ce2cdf5d",
   "metadata": {
    "tags": []
   },
   "source": [
    "#### Console and Memory Sink"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "74c72469-4c37-417c-a866-a1c1ef75ae8b",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):\n",
    "    write_query = df.writeStream \\\n",
    "        .outputMode(output_mode) \\\n",
    "        .trigger(processingTime=processing_time) \\\n",
    "        .format(\"console\") \\\n",
    "        .option(\"truncate\", False) \\\n",
    "        .start()\n",
    "    return write_query # pyspark.sql.streaming.StreamingQuery"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "id": "d866c7ba-f8e9-475d-830a-50ffb2c5472b",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23/02/21 21:46:12 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-289a958e-f6b6-4b38-a87b-50002d82ec8b. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.\n",
      "23/02/21 21:46:12 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n",
      "23/02/21 21:46:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0-3, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
      "23/02/21 21:46:12 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0-3, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
      "23/02/21 21:46:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-4, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
      "23/02/21 21:46:13 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-4, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
      "-------------------------------------------\n",
      "Batch: 0\n",
      "-------------------------------------------\n",
      "+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
      "|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|\n",
      "+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
      "|1        |2020-07-01 00:25:32 |2020-07-01 00:33:39  |1              |1.5          |2           |9.3         |\n",
      "|1        |2020-07-01 00:03:19 |2020-07-01 00:25:43  |1              |9.5          |1           |27.8        |\n",
      "|2        |2020-07-01 00:15:11 |2020-07-01 00:29:24  |1              |5.85         |2           |22.3        |\n",
      "|2        |2020-07-01 00:30:49 |2020-07-01 00:38:26  |1              |1.9          |1           |14.16       |\n",
      "|2        |2020-07-01 00:31:26 |2020-07-01 00:38:02  |1              |1.25         |2           |7.8         |\n",
      "|1        |2020-07-01 00:25:32 |2020-07-01 00:33:39  |1              |1.5          |2           |9.3         |\n",
      "|1        |2020-07-01 00:03:19 |2020-07-01 00:25:43  |1              |9.5          |1           |27.8        |\n",
      "|2        |2020-07-01 00:15:11 |2020-07-01 00:29:24  |1              |5.85         |2           |22.3        |\n",
      "|2        |2020-07-01 00:30:49 |2020-07-01 00:38:26  |1              |1.9          |1           |14.16       |\n",
      "|2        |2020-07-01 00:31:26 |2020-07-01 00:38:02  |1              |1.25         |2           |7.8         |\n",
      "+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
      "\n",
      "23/02/21 22:11:05 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-5, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
      "23/02/21 22:11:05 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor-5, groupId=spark-kafka-source-a303026d-ebd2-4fd3-a000-adb99dfea4a9--717872766-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-------------------------------------------\n",
      "Batch: 1\n",
      "-------------------------------------------\n",
      "+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
      "|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|payment_type|total_amount|\n",
      "+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
      "|1        |2020-07-01 00:25:32 |2020-07-01 00:33:39  |1              |1.5          |2           |9.3         |\n",
      "|1        |2020-07-01 00:03:19 |2020-07-01 00:25:43  |1              |9.5          |1           |27.8        |\n",
      "|2        |2020-07-01 00:15:11 |2020-07-01 00:29:24  |1              |5.85         |2           |22.3        |\n",
      "|2        |2020-07-01 00:30:49 |2020-07-01 00:38:26  |1              |1.9          |1           |14.16       |\n",
      "|2        |2020-07-01 00:31:26 |2020-07-01 00:38:02  |1              |1.25         |2           |7.8         |\n",
      "+---------+--------------------+---------------------+---------------+-------------+------------+------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "write_query = sink_console(df_rides, output_mode='append')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "a9bfa73f-a8cc-4988-a8cf-bf31ee6c449c",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "def sink_memory(df, query_name, query_template):\n",
    "    write_query = df \\\n",
    "        .writeStream \\\n",
    "        .queryName(query_name) \\\n",
    "        .format('memory') \\\n",
    "        .start()\n",
    "    query_str = query_template.format(table_name=query_name)\n",
    "    query_results = spark.sql(query_str)\n",
    "    return write_query, query_results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "b31d0b76-e917-44e7-a14d-f9ce6901c23a",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "23/02/21 21:31:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b3e2c096-aa06-4083-9cdf-d6f3cf04fc06. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.\n",
      "23/02/21 21:31:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.\n",
      "23/02/21 21:31:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0-1, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
      "23/02/21 21:31:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0-1, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-driver-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n",
      "23/02/21 21:31:49 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor-2, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.\n",
      "23/02/21 21:31:49 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor-2, groupId=spark-kafka-source-f07faf6a-cb53-4ec8-bf58-1685d976f432--722858875-executor] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "                                                                                \r"
     ]
    }
   ],
   "source": [
    "query_name = 'vendor_id_counts'\n",
    "query_template = 'select count(distinct(vendor_id)) from {table_name}'\n",
    "write_query, df_vendor_id_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "id": "4ba56111-83bf-4028-ac65-565e0190f310",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<class 'pyspark.sql.streaming.StreamingQuery'>\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "{'message': 'Waiting for data to arrive',\n",
       " 'isDataAvailable': False,\n",
       " 'isTriggerActive': True}"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "print(type(write_query)) # pyspark.sql.streaming.StreamingQuery\n",
    "write_query.status"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "id": "7cc37bda-9cfa-402b-9d42-a6ba5271476b",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------------------+\n",
      "|count(DISTINCT vendor_id)|\n",
      "+-------------------------+\n",
      "|                        2|\n",
      "+-------------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_vendor_id_counts.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "id": "88862ca9-4d89-487e-987f-08a2b9e83efe",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "write_query.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "443d4041-06db-4a4a-89c1-348848cc7ca8",
   "metadata": {
    "tags": []
   },
   "source": [
    "#### Kafka Sink\n",
    "\n",
    "To write stream results to `kafka-topic`, the stream dataframe has at least a column with name `value`.\n",
    "\n",
    "Therefore before starting `writeStream` in kafka format, dataframe needs to be updated accordingly.\n",
    "\n",
    "More information regarding kafka sink expected data structure [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "id": "8b08a013-d039-41cf-94fd-a1a57571d25f",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):\n",
    "    columns = df.columns\n",
    "    df = df.withColumn(\"value\", F.concat_ws(', ',*value_columns))    \n",
    "    if key_column:\n",
    "        df = df.withColumnRenamed(key_column,\"key\")\n",
    "        df = df.withColumn(\"key\",df.key.cast('string'))\n",
    "    return df.select(['key', 'value'])\n",
    "    \n",
    "def sink_kafka(df, topic, output_mode='append'):\n",
    "    write_query = df.writeStream \\\n",
    "        .format(\"kafka\") \\\n",
    "        .option(\"kafka.bootstrap.servers\", \"localhost:9092,broker:29092\") \\\n",
    "        .outputMode(output_mode) \\\n",
    "        .option(\"topic\", topic) \\\n",
    "        .option(\"checkpointLocation\", \"checkpoint\") \\\n",
    "        .start()\n",
    "    return write_query"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e4cb2140-9f2e-4914-b74c-be4c18cdbe8a",
   "metadata": {},
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
